Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12336

custom stream naming does not work while calling stream[K, V](topicPattern: Pattern) API with named Consumed parameter

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.7.0
    • 2.8.1, 3.0.0
    • streams

    Description

      In our Scala application I am trying to implement custom naming for Kafka Streams application nodes.

      We are using topicPattern for our stream source.

      Here is an API which I am calling:

       

      val topicsPattern="t-[A-Za-z0-9-].suffix"
      val operations: KStream[MyKey, MyValue] =
        builder.stream[MyKey, MyValue](Pattern.compile(topicsPattern))(
          Consumed.`with`[MyKey, MyValue].withName("my-fancy-name")
        )
      

       Despite the fact that I am providing Consumed with custom name the topology describe still show "KSTREAM-SOURCE-0000000000" as name for our stream source.

      It is not a problem if I just use a name for topic. But our application needs to get messages from set of topics based on topicname pattern matching.

      After checking the kakfa code I see that

      org.apache.kafka.streams.kstream.internals.InternalStreamBuilder (on line 103) has a bug:

      public <K, V> KStream<K, V> stream(final Pattern topicPattern,
                                         final ConsumedInternal<K, V> consumed) {
          final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
          final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed);
      

      node name construction does not take into account the name of consumed parameter.

      For example code for another stream api call with topic name does it correctly:

      final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
      

       

      Attachments

        Issue Links

          Activity

            People

              Geordie GeordieMai
              ramazanyich Ramil Israfilov
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: